Run the dependant pipelines


In [49]:
%%bash
cd /pipelines
KNESSET_LOAD_FROM_URL=1 KNESSET_DATASERVICE_INCREMENTAL= \
  dpp run --no-use-cache --concurrency 2 --verbose \
    ./committees/kns_committeesession,./members/mk_individual


[./committees/kns_committeesession:T_0] >>> INFO    :cba22646 RUNNING ./committees/kns_committeesession
[./committees/kns_committeesession:T_0] >>> INFO    :cba22646 Collecting dependencies
[./committees/kns_committeesession:T_0] >>> INFO    :cba22646 Running async task
[./committees/kns_committeesession:T_0] >>> INFO    :cba22646 Waiting for completion
[./committees/kns_committeesession:T_0] >>> INFO    :cba22646 Async task starting
[./committees/kns_committeesession:T_0] >>> INFO    :cba22646 Building process chain:
[./committees/kns_committeesession:T_0] >>> INFO    :- load_resource
[./committees/kns_committeesession:T_0] >>> INFO    :- knesset.dump_to_path
[./committees/kns_committeesession:T_0] >>> INFO    :- knesset.dump_to_sql
[./committees/kns_committeesession:T_0] >>> INFO    :- (sink)
[./members/mk_individual:T_1] >>> INFO    :e2d6f365 RUNNING ./members/mk_individual
[./members/mk_individual:T_1] >>> INFO    :e2d6f365 Collecting dependencies
[./members/mk_individual:T_1] >>> INFO    :e2d6f365 Running async task
[./members/mk_individual:T_1] >>> INFO    :e2d6f365 Waiting for completion
[./members/mk_individual:T_1] >>> INFO    :e2d6f365 Async task starting
[./members/mk_individual:T_1] >>> INFO    :e2d6f365 Building process chain:
[./members/mk_individual:T_1] >>> INFO    :- load_resource
[./members/mk_individual:T_1] >>> INFO    :- knesset.dump_to_path
[./members/mk_individual:T_1] >>> INFO    :- knesset.dump_to_sql
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/committees/kns_committeesession/datapackage.json HTTP/1.1" 200 3751
[./members/mk_individual:T_1] >>> INFO    :- (sink)
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/committees/kns_committeesession/datapackage.json HTTP/1.1" 200 3751
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/datapackage.json HTTP/1.1" 200 14563
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/committees/kns_committeesession/kns_committeesession.csv HTTP/1.1" 200 41799011
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/datapackage.json HTTP/1.1" 200 14563
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (2): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual_positions.csv HTTP/1.1" 200 7154433
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (2): storage.googleapis.com:80
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/committees/kns_committeesession/kns_committeesession.csv HTTP/1.1" 200 41799011
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: INFO    :loaded 10000 rows
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: INFO    :loaded 20000 rows
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: INFO    :loaded 30000 rows
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: INFO    :loaded 40000 rows
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: INFO    :loaded 50000 rows
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: INFO    :loaded 60000 rows
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: INFO    :loaded 70000 rows
[./committees/kns_committeesession:T_0] >>> INFO    :load_resource: INFO    :Processed 74409 rows
[./committees/kns_committeesession:T_0] >>> INFO    :cba22646 DONE /usr/local/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/load_resource.py
[./committees/kns_committeesession:T_0] >>> INFO    :knesset.dump_to_path: INFO    :Processed 74409 rows
[./committees/kns_committeesession:T_0] >>> INFO    :knesset.dump_to_sql: INFO    :Processed 74409 rows
[./committees/kns_committeesession:T_0] >>> INFO    :cba22646 DONE /usr/local/lib/python3.6/site-packages/datapackage_pipelines/manager/../lib/internal/sink.py
[./committees/kns_committeesession:T_0] >>> INFO    :cba22646 DONE /pipelines/datapackage_pipelines_knesset/processors/dump_to_path.py
[./committees/kns_committeesession:T_0] >>> INFO    :cba22646 DONE /pipelines/datapackage_pipelines_knesset/processors/dump_to_sql.py
[./committees/kns_committeesession:T_0] >>> INFO    :cba22646 DONE V ./committees/kns_committeesession {'.dpp': {'out-datapackage-url': '../data/committees/kns_committeesession/datapackage.json'}, 'bytes': None, 'count_of_rows': 74409, 'dataset_name': '_', 'hash': '69ff9c2cc04646502e81a5dc795f85ea'}
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual_positions.csv HTTP/1.1" 200 7154433
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual.csv HTTP/1.1" 200 233706
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (2): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual.csv HTTP/1.1" 200 233706
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/kns_knessetdates.csv HTTP/1.1" 200 14303
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (2): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/kns_knessetdates.csv HTTP/1.1" 200 14303
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual_names.csv HTTP/1.1" 200 50721
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (2): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual_names.csv HTTP/1.1" 200 50721
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual_factions.csv HTTP/1.1" 200 184671
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (2): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual_factions.csv HTTP/1.1" 200 184671
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual_faction_chairpersons.csv HTTP/1.1" 200 5484
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual_faction_chairpersons.csv HTTP/1.1" 200 5484
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual_committees.csv HTTP/1.1" 200 1019784
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (2): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual_committees.csv HTTP/1.1" 200 1019784
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual_govministries.csv HTTP/1.1" 200 99302
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (2): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual_govministries.csv HTTP/1.1" 200 99302
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/factions.csv HTTP/1.1" 200 21670
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (2): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/factions.csv HTTP/1.1" 200 21670
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (1): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/faction_memberships.csv HTTP/1.1" 200 126687
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :Starting new HTTP connection (2): storage.googleapis.com:80
[./members/mk_individual:T_1] >>> INFO    :load_resource: DEBUG   :http://storage.googleapis.com:80 "GET /knesset-data-pipelines/data/members/mk_individual/faction_memberships.csv HTTP/1.1" 200 126687
[./members/mk_individual:T_1] >>> INFO    :load_resource: INFO    :Processed 18342 rows
[./members/mk_individual:T_1] >>> INFO    :knesset.dump_to_path: INFO    :Processed 18342 rows
[./members/mk_individual:T_1] >>> INFO    :knesset.dump_to_sql: INFO    :Processed 18342 rows
[./members/mk_individual:T_1] >>> INFO    :e2d6f365 DONE /usr/local/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/load_resource.py
[./members/mk_individual:T_1] >>> INFO    :e2d6f365 DONE /usr/local/lib/python3.6/site-packages/datapackage_pipelines/manager/../lib/internal/sink.py
[./members/mk_individual:T_1] >>> INFO    :e2d6f365 DONE /pipelines/datapackage_pipelines_knesset/processors/dump_to_path.py
[./members/mk_individual:T_1] >>> INFO    :e2d6f365 DONE /pipelines/datapackage_pipelines_knesset/processors/dump_to_sql.py
[./members/mk_individual:T_1] >>> INFO    :e2d6f365 DONE V ./members/mk_individual {'.dpp': {'out-datapackage-url': '../data/members/mk_individual/datapackage.json'}, 'bytes': None, 'count_of_rows': 18342, 'dataset_name': '_', 'hash': 'd94b918b09316158156804fda0cbb854'}
INFO    :RESULTS:
INFO    :SUCCESS: ./committees/kns_committeesession {'bytes': None, 'count_of_rows': 74409, 'dataset_name': '_', 'hash': '69ff9c2cc04646502e81a5dc795f85ea'}
INFO    :SUCCESS: ./members/mk_individual {'bytes': None, 'count_of_rows': 18342, 'dataset_name': '_', 'hash': 'd94b918b09316158156804fda0cbb854'}

Inspect the source data

Choose a committee session ID to focus on, make sure it has all the fields


In [23]:
CommitteeSessionID = 2059313

In [26]:
from dataflows import Flow, load, printer, filter_rows

committeesession_data = Flow(
    load('/pipelines/data/committees/kns_committeesession/datapackage.json'),
    filter_rows(lambda row: row['CommitteeSessionID'] == CommitteeSessionID),
    printer(tablefmt='html')
).results()


kns_committeesession

# CommitteeSessionID (integer) Number (integer) KnessetNum (integer) TypeID (integer)TypeDesc (string) CommitteeID (integer)Location (string) SessionUrl (string) BroadcastUrl (string) StartDate (datetime) FinishDate (datetime) Note (string) LastUpdatedDate (datetime) download_crc32c (string) download_filename (string) download_filesize (integer)parts_crc32c (string) parts_filesize (integer)parts_parsed_filename (string) text_crc32c (string) text_filesize (integer)text_parsed_filename (string) topics (array) committee_name (string)
1205931346215161פתוחה2חדר הוועדה, באגף הוועדות (קדמה), קומה 3, חדר 3750http://main.knesset.gov.il/Activity/committees/Pages/AllCommitteesAgenda.aspx?Tab=3&ItemID=2059313None2002-11-19 00:00:002002-11-19 00:00:00הצעת חוק ההסדרים במשק המדינה (תיקוני חקיקה להשגת יעדי התקציב והמדיניות הכלכלית לשנת הכספים 2003, התש ...2018-10-10 11:03:06+lu4+A==files/23/4/3/430592.DOC91162x9rEiQ==194576files/2/0/2059313.csvjD1Riw==195031files/2/0/2059313.txtNoneהכספים

Download the protocol text


In [38]:
import os
text_url = 'https://storage.googleapis.com/knesset-data-pipelines/data/committees/meeting_protocols_text/{}'.format(committeesession_data[0][0][0]['text_parsed_filename'])
filename = '/pipelines/data/committees/meeting_protocols_text/{}'.format(committeesession_data[0][0][0]['text_parsed_filename'])
os.makedirs(os.path.dirname(filename), exist_ok=True)
cmd = 'curl {} > {}'.format(text_url, filename)
!{cmd}


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  190k  100  190k    0     0   285k      0 --:--:-- --:--:-- --:--:--  285k

Modify the pipeline yaml to run on the selected committee session ID

Under committee-meeting-attendees: set the following to parse a single meeting (+add cache):

  - run: filter
    cache: true
    parameters:
      resources: kns_committeesession
      in:
      - CommitteeSessionID: 2068104
  - run: committee_meeting_attendees
    # parameters:
    #   filter-meeting-id: [2068104]

Delete the cache hash and run the pipeline


In [47]:
%%bash
cd /pipelines
rm -rf data/people/committees/meeting-attendees/cache_hash
KNESSET_DATASERVICE_INCREMENTAL= \
  dpp run --verbose \
    ./people/committee-meeting-attendees


[./people/committee-meeting-attendees:T_0] >>> INFO    :8a208434 RUNNING ./people/committee-meeting-attendees
[./people/committee-meeting-attendees:T_0] >>> INFO    :8a208434 Collecting dependencies
[./people/committee-meeting-attendees:T_0] >>> INFO    :8a208434 Running async task
[./people/committee-meeting-attendees:T_0] >>> INFO    :8a208434 Waiting for completion
[./people/committee-meeting-attendees:T_0] >>> INFO    :8a208434 Async task starting
[./people/committee-meeting-attendees:T_0] >>> INFO    :8a208434 Searching for existing caches
[./people/committee-meeting-attendees:T_0] >>> INFO    :Found cache for step 3: filter
[./people/committee-meeting-attendees:T_0] >>> INFO    :8a208434 Building process chain:
[./people/committee-meeting-attendees:T_0] >>> INFO    :- cache_loader
[./people/committee-meeting-attendees:T_0] >>> INFO    :- committee_meeting_attendees
[./people/committee-meeting-attendees:T_0] >>> INFO    :- join_committee_meeting_attendees_mks
[./people/committee-meeting-attendees:T_0] >>> INFO    :- knesset.dump_to_path
[./people/committee-meeting-attendees:T_0] >>> INFO    :- knesset.dump_to_sql
[./people/committee-meeting-attendees:T_0] >>> INFO    :- (sink)
[./people/committee-meeting-attendees:T_0] >>> INFO    :committee_meeting_attendees: INFO    :getting attendees for meeting 2059313
[./people/committee-meeting-attendees:T_0] >>> INFO    :8a208434 DONE /usr/local/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/cache_loader.py
[./people/committee-meeting-attendees:T_0] >>> INFO    :committee_meeting_attendees: INFO    :Processed 1016 rows
[./people/committee-meeting-attendees:T_0] >>> INFO    :8a208434 DONE /pipelines/people/committee_meeting_attendees.py
[./people/committee-meeting-attendees:T_0] >>> INFO    :join_committee_meeting_attendees_mks: INFO    :Processed 1 rows
[./people/committee-meeting-attendees:T_0] >>> INFO    :knesset.dump_to_path: INFO    :Processed 1 rows
[./people/committee-meeting-attendees:T_0] >>> INFO    :8a208434 DONE /pipelines/people/join_committee_meeting_attendees_mks.py
[./people/committee-meeting-attendees:T_0] >>> INFO    :knesset.dump_to_sql: INFO    :Processed 1 rows
[./people/committee-meeting-attendees:T_0] >>> INFO    :8a208434 DONE /pipelines/datapackage_pipelines_knesset/processors/dump_to_path.py
[./people/committee-meeting-attendees:T_0] >>> INFO    :8a208434 DONE /usr/local/lib/python3.6/site-packages/datapackage_pipelines/manager/../lib/internal/sink.py
[./people/committee-meeting-attendees:T_0] >>> INFO    :8a208434 DONE /pipelines/datapackage_pipelines_knesset/processors/dump_to_sql.py
[./people/committee-meeting-attendees:T_0] >>> INFO    :8a208434 DONE V ./people/committee-meeting-attendees {'.dpp': {'out-datapackage-url': '../data/people/committees/meeting-attendees/datapackage.json'}, 'bytes': None, 'count_of_rows': 1, 'dataset_name': '_', 'hash': 'b930d619b391d8f667d60cecc2a95243'}
INFO    :RESULTS:
INFO    :SUCCESS: ./people/committee-meeting-attendees {'bytes': None, 'count_of_rows': 1, 'dataset_name': '_', 'hash': 'b930d619b391d8f667d60cecc2a95243'}

Inspect the data


In [48]:
from dataflows import Flow, load, printer

Flow(
    load('/pipelines/data/people/committees/meeting-attendees/datapackage.json'),
    printer(tablefmt='html')
).process()


kns_committeesession

# CommitteeSessionID (integer) Number (integer) KnessetNum (integer) TypeID (integer)TypeDesc (string) CommitteeID (integer)Location (string) SessionUrl (string) BroadcastUrl (string) StartDate (datetime) FinishDate (datetime) Note (string) LastUpdatedDate (datetime) download_crc32c (string) download_filename (string) download_filesize (integer)parts_crc32c (string) parts_filesize (integer)parts_parsed_filename (string) text_crc32c (string) text_filesize (integer)text_parsed_filename (string) topics (array) committee_name (string) mks (array) invitees (array) legal_advisors (array) manager (array) financial_advisors (array) attended_mk_individual_ids (array)
1205931346215161פתוחה2חדר הוועדה, באגף הוועדות (קדמה), קומה 3, חדר 3750http://main.knesset.gov.il/Activity/committees/Pages/AllCommitteesAgenda.aspx?Tab=3&ItemID=2059313None2002-11-19 00:00:002002-11-19 00:00:00הצעת חוק ההסדרים במשק המדינה (תיקוני חקיקה להשגת יעדי התקציב והמדיניות הכלכלית לשנת הכספים 2003, התש ...2018-10-10 11:03:06+lu4+A==files/23/4/3/430592.DOC91162x9rEiQ==194576files/2/0/2059313.csvjD1Riw==195031files/2/0/2059313.txtNoneהכספים['יעקב ליצמן - היו"ר', 'אבשלום וילן', 'עופר חוגי', 'אמנון כהן', 'רחמים מלול', 'משולם נהרי'][{'name': 'חה"כ צבי הנדל'}, {'name': 'חה"כ עמיר פרץ'}, {'name': 'יעקב ניזרי-סמנכ"ל שירות התעסוקה, מש ...['שגית אפיק', 'ליאורה סידי (מתמחה)', 'אנה שניידר']['טמיר כהן'][][105, 203, 46, 207, 210, 216, 219, 222]
Out[48]:
(<datapackage.package.Package at 0x7f83c0bd1748>, {})